// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#include <db/memtable.h>
#include <list>

#include "arena_handle_event.h"
#include "include/rocksdb/memtablerep.h"
#include "memtable/inlineskiplist.h"
#include "pure_mem/rangearena/unlimited_space_arena.h"
#include "range_arena.h"
#include "range_arena_rebuild.h"
#include "util/allocator.h"

namespace rocksdb {

static const size_t kMemBlockMaxSize = 64 << 20;
static const size_t kMemBlockGrowPer = 20;
static const size_t kMemBlockReservedPer = 20;
static const size_t kMemBlockDeletedPer = 30;
static const size_t kBlockMinSize = 10 << 20;
class PureMemRep;
class MetaKeyComparator : public MemTableRep::KeyComparator {
 public:
  typedef rocksdb::Slice DecodedType;

  DecodedType decode_key(const char* key) const override {
    // The format of key is frozen and can be terated as a part of the API
    // contract. Refer to MemTable::Add for details.
    return GetLengthPrefixedSlice(key);
  }

  // Compare a and b. Return a negative value if a is less than b, 0 if they
  // are equal, and a positive value if a is greater than b
  // Compare size in byte order
  int operator()(const char* prefix_len_key1,
                 const char* prefix_len_key2) const override {
    Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
    Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);

    int comLength = std::min(k1.size(), k2.size());
    for (int i = 0; i < comLength; i++) {
      if ((uint8_t)k1[i] > (uint8_t)k2[i]) {
        return 1;
      } else if ((uint8_t)k1[i] < (uint8_t)k2[i])
        return -1;
    }

    if (k1.size() > k2.size()) return 1;
    if (k1.size() < k2.size()) return -1;
    return 0;
  }

  int operator()(const char* prefix_len_key, const Slice& key) const override {
    Slice key_pre = GetLengthPrefixedSlice(prefix_len_key);
    return key_pre.compare(key);
  }

  //此函数不会被调用
  int operator()(const Slice &key1, const Slice &key2,
                 SequenceNumber seq1, SequenceNumber seq2) const override {
    assert(false);
    return 0;
  }

  int operator()(const Slice &key, SequenceNumber seq,
                 const char *prefix_len_key) const override {
    assert(false);
    return 0;
  }

  int operator()(const Slice &key_a, SequenceNumber seq,
                 const Slice &key_b) const override {
    assert(false);
    return 0;
  }

  ~MetaKeyComparator() override = default;
};

class MultiRangeManager;

class MultiRangeArena : public Allocator {
 public:
  explicit MultiRangeArena(PureMemRep* table, Logger* info_log);

  ~MultiRangeArena() override {};

  void* Delete(const Slice& key, size_t len, void* rangearena, void* node = nullptr);

  char* Allocate(size_t bytes, const Slice& userkey, void* node, void** rangearena) override;

  static void AllocateOK(const Slice& key, uint32_t encoded_len, const char* handle, void *rangearena);

  char* AllocateAligned(size_t bytes, size_t huge_page_size,
                        Logger* logger) override { return nullptr;}

  size_t BlockSize() const override { return 0; }

  Logger* GetLog() {
    return info_log_;
  }

  void RangeArenaAdd(const Slice& userKey, RangeArena* addr);

  RangeArena* SearchBlock(const Slice& userkey);

  void RangeArenaChange(const Slice& userkey, RangeArena* new_addr);

  void RangeArenaClear();

  MultiRangeArena(const MultiRangeArena&) = delete;

  MultiRangeArena& operator=(const MultiRangeArena&) = delete;
  PureMemRep* PMemRep() { return pmem_rep_; }
  ThreadSafeQueue<RangeArena*>& RangeArenaQue() { return range_arenas_; }

 private:
  PureMemRep * pmem_rep_;
  Logger* info_log_;
  MetaKeyComparator mkey_cmp_;
  UnlimitedArena arena;
  InlineSkipList<const MemTableRep::KeyComparator&> skip_list_{mkey_cmp_, &arena};
  ThreadSafeQueue<RangeArena*> range_arenas_;
  std::unique_ptr<MultiRangeManager> event_manager_;
};

}  // namespace rocksdb
